package mapreduce

import (
	"fmt"
	"sync"
)

//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
	var ntasks int
	var n_other int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mapFiles)
		n_other = nReduce
	case reducePhase:
		ntasks = nReduce
		n_other = len(mapFiles)
	}

	fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

	// All ntasks tasks have to be scheduled on workers. Once all tasks
	// have completed successfully, schedule() should return.
	//
	// Your code here (Part III, Part IV).
	//
	var wg sync.WaitGroup
	lock := &sync.Mutex{}
	// Part 3实现1：使用两个channel处理可用的woker：
	//	registerChan用于生成启动后注册到master的worker,
	//	avaiChan用于保存已经注册的执行完任务的可再次调度的worker。
	//avaiChan := make(chan string, 1)
	//for i := 0; i < ntasks; {
	//	select {
	//		case rwk := <- registerChan:
	//			fmt.Printf("register worker: %s available\n", rwk)
	//			wg.Add(1)
	//			doTaskArgs := DoTaskArgs{ JobName:jobName, File:mapFiles[i], Phase:phase,
	//				TaskNumber:i, NumOtherPhase:n_other}
	//			go func(){
	//				ok := call(rwk, "Worker.DoTask", doTaskArgs, nil)
	//				if ok == false {
	//					log.Fatalf("DoTask: RPC %s doTask error\n", rwk)
	//				}
	//				wg.Done()
	//				// tasks执行完毕后将worker重新写入registerChan，以便重新调度
	//				//registerChan <- worker
	//				avaiChan <- rwk
	//			} ()
	//			i++
	//		case awk := <- avaiChan:
	//			fmt.Printf("done worker: %s available\n", awk)
	//			wg.Add(1)
	//			doTaskArgs := DoTaskArgs{ JobName:jobName, File:mapFiles[i], Phase:phase,
	//				TaskNumber:i, NumOtherPhase:n_other}
	//			go func(){
	//				ok := call(awk, "Worker.DoTask", doTaskArgs, nil)
	//				if ok == false {
	//					log.Fatalf("DoTask: RPC %s doTask error\n", awk)
	//				}
	//				wg.Done()
	//				// tasks执行完毕后将worker重新写入registerChan，以便重新调度
	//				//registerChan <- worker
	//				avaiChan <- awk
	//			} ()
	//			i++
	//	}
	//}

	// Part 3实现2：只使用registerChan，当worker准备好后向master注册后产生worker，
	//				当已经注册过的worker执行完任务后再次写入registerChan，以便重新调度。
	tasks := make([]int, ntasks)
	for i := 0; i < ntasks; i++ {
		tasks[i] = i
	}
	for {
		// 判断是否还有待处理的任务。如果没有，跳出循环；否则，取出下一个任务。
		lock.Lock()
		if len(tasks) <= 0 {
			lock.Unlock()
			break
		}
		task := tasks[0]
		tasks = append(tasks[:0], tasks[1:]...)
		lock.Unlock()

		worker := <- registerChan
		wg.Add(1)
		doTaskArgs := DoTaskArgs{ JobName:jobName, File:mapFiles[task], Phase:phase,
			TaskNumber:task, NumOtherPhase:n_other}
		go func(){
			ok := call(worker, "Worker.DoTask", doTaskArgs, nil)
			if ok == false {	// worker故障
				// 将任务重新加入任务列表
				lock.Lock()
				tasks = append(tasks, doTaskArgs.TaskNumber)
				lock.Unlock()
				fmt.Printf("DoTask: RPC %s doTask error\n", worker)
				wg.Done()
			} else {
				wg.Done()
				// tasks执行完毕后将worker重新写入registerChan，以便重新调度
				registerChan <- worker
			}
		} ()
	}
	wg.Wait()

	fmt.Printf("Schedule: %v done\n", phase)
}
